Skip to content

Conversation

DaveCTurner
Copy link
Contributor

@DaveCTurner DaveCTurner commented Jan 6, 2023

BroadcastReplicationAction derivatives (POST /<indices>/_refresh and POST /<indices>/_flush) are pretty inefficient when targeting high shard counts due to how TransportBroadcastReplicationAction works:

  • It computes the list of all target shards up-front on the calling (transport) thread.

  • It eagerly sends one request for every target shard in a tight loop on the calling (transport) thread.

  • It accumulates responses in a CopyOnWriteArrayList which takes quadratic work to populate, even though nothing reads this list until it's fully populated.

  • It then mostly discards the accumulated responses, keeping only the total number of shards, the number of successful shards, and a list of any failures.

  • Each failure is wrapped up in a ReplicationResponse.ShardInfo.Failure but then unwrapped at the end to be re-wrapped in a DefaultShardOperationFailedException.

This commit fixes all this:

  • It avoids allocating a list of all target shards, instead iterating over the target indices and generating shard IDs on the fly.

  • The computation of the list of shards, and the sending of the per-shard requests, now happens on the relevant threadpool (REFRESH or FLUSH) rather than a transport thread. see NodeClient#executeLocally always executes action on the calling thread #92730

  • The per-shard requests are now throttled, with a meaningful yet fairly generous concurrency limit of #(data nodes) * 10.

  • Rather than accumulating the full responses for later processing we track the counts and failures directly.

  • The failures are tracked in a regular ArrayList, avoiding the accidentally-quadratic complexity.

  • The failures are tracked in their final form, skipping the unwrap-and-rewrap step at the end.

Relates #77466

BroadcastReplicationAction derivatives (`POST /<indices>/_refresh` and
`POST /<indices>/_flush`) are pretty inefficient when targeting high
shard counts due to how `TransportBroadcastReplicationAction` works:

- It computes the list of all target shards up-front on the calling
  (transport) thread.

- It eagerly sends one request for every target shard in a tight loop on
  the calling (transport) thread.

- It accumulates responses in a `CopyOnWriteArrayList` which takes
  quadratic work to populate, even though nothing reads this list until
  it's fully populated.

- It then mostly discards the accumulated responses, keeping only the
  total number of shards, the number of successful shards, and a list of
  any failures.

- Each failure is wrapped up in a `ReplicationResponse.ShardInfo.Failure`
  but then unwrapped at the end to be re-wrapped in a
  `DefaultShardOperationFailedException`.

This commit fixes all this:

- It avoids allocating a list of all target shards, instead iterating
  over the target indices and generating shard IDs on the fly.

- The computation of the list of shards, and the sending of the
  per-shard requests, now happens on the relevant threadpool (`REFRESH`
  or `FLUSH`) rather than a transport thread.

- The per-shard requests are now throttled, with a meaningful yet fairly
  generous concurrency limit of `#(data nodes) * 10`.

- Rather than accumulating the full responses for later processing we
  track the counts and failures directly.

- The failures are tracked in a regular `ArrayList`, avoiding the
  accidentally-quadratic complexity.

- The failures are tracked in their final form, skipping the
  unwrap-and-rewrap step at the end.

Relates elastic#77466
@DaveCTurner DaveCTurner added >bug :Distributed Coordination/Network Http and internode communication implementations v8.7.0 labels Jan 6, 2023
@elasticsearchmachine
Copy link
Collaborator

Hi @DaveCTurner, I've created a changelog YAML for you.

import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;

public class ThrottledIterator<T> implements Releasable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This, and some of the other utilities introduced here, are extracted from #92373.

Comment on lines 73 to 75
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.METADATA_READ;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprised this wasn't checked already, today we just return a trivial success before state recovery:

{
  "_shards" : {
    "total" : 0,
    "successful" : 0,
    "failed" : 0
  }
}

We could leave this as-is ofc.

Comment on lines 69 to 71
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.METADATA_READ;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprised this wasn't checked already, today we just return a trivial success before state recovery:

{
  "_shards" : {
    "total" : 0,
    "successful" : 0,
    "failed" : 0
  }
}

We could leave this as-is ofc.

}

protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {
// assert Transports.assertNotTransportThread("per-shard requests might be high-volume"); TODO Yikes!
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #92730

DaveCTurner added a commit to DaveCTurner/elasticsearch that referenced this pull request Jan 13, 2023
BroadcastReplicationAction derivatives (`POST /<indices>/_refresh` and
`POST /<indices>/_flush`) are pretty inefficient when targeting high
shard counts due to how `TransportBroadcastReplicationAction` works:

- It computes the list of all target shards up-front on the calling
  (transport) thread.

- It accumulates responses in a `CopyOnWriteArrayList` which takes
  quadratic work to populate, even though nothing reads this list until
it's fully populated.

- It then mostly discards the accumulated responses, keeping only the
  total number of shards, the number of successful shards, and a list of
any failures.

- Each failure is wrapped up in a
  `ReplicationResponse.ShardInfo.Failure` but then unwrapped at the end
to be re-wrapped in a `DefaultShardOperationFailedException`.

This commit fixes all this:

- The computation of the list of shards, and the sending of the
  per-shard requests, now happens on the relevant threadpool (`REFRESH`
or `FLUSH`) rather than a transport thread.

- The failures are tracked in a regular `ArrayList`, avoiding the
  accidentally-quadratic complexity.

- Rather than accumulating the full responses for later processing we
  track the counts and failures directly.

- The failures are tracked in their final form, skipping the
  unwrap-and-rewrap step at the end.

Relates elastic#77466
Relates elastic#92729
@DaveCTurner
Copy link
Contributor Author

Closing this for now, I opened #92902 with the more obviously-correct bits of this change.

elasticsearchmachine pushed a commit that referenced this pull request Jan 13, 2023
BroadcastReplicationAction derivatives (`POST /<indices>/_refresh` and
`POST /<indices>/_flush`) are pretty inefficient when targeting high
shard counts due to how `TransportBroadcastReplicationAction` works:

- It computes the list of all target shards up-front on the calling (transport) thread.

- It accumulates responses in a `CopyOnWriteArrayList` which takes quadratic work to populate, even though nothing reads this list until it's fully populated.

- It then mostly discards the accumulated responses, keeping only the total number of shards, the number of successful shards, and a list of any failures.

- Each failure is wrapped up in a `ReplicationResponse.ShardInfo.Failure` but then unwrapped at the end to be re-wrapped in a `DefaultShardOperationFailedException`.

This commit fixes all this:

- The computation of the list of shards, and the sending of the per-shard requests, now happens on the relevant threadpool (`REFRESH` or `FLUSH`) rather than a transport thread.

- The failures are tracked in a regular `ArrayList`, avoiding the accidentally-quadratic complexity.

- Rather than accumulating the full responses for later processing we track the counts and failures directly.

- The failures are tracked in their final form, skipping the unwrap-and-rewrap step at the end.

Relates #77466 Relates #92729
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

>bug :Distributed Coordination/Network Http and internode communication implementations v8.7.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants